package org.voovan.korla;

import org.junit.Before;
import org.junit.Test;
import org.voovan.korla.message.Callback;
import org.voovan.korla.message.Msg;
import org.voovan.korla.socket.KorlaConsumerPool;
import org.voovan.korla.socket.KorlarConsumer;
import org.voovan.tools.TEnv;
import org.voovan.tools.serialize.TSerialize;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 连接池
 *
 * @author: helyho
 * korla Framework.
 * WebSite: https://github.com/helyho/korla
 * Licence: Apache v2 License
 */
public class KorlarConsumerManagerUnit {
    private static boolean sendOnConnect = false;
    private transient KorlaConsumerPool korlaConsumerPool;

    static {
        TSerialize.register(TimeMsg.class);
    }

    @Before
    public void before() throws IOException {
        korlaConsumerPool = new KorlaConsumerPool("127.0.0.1", 3333, 500000, 500000, 20, 30);

//        korlaConsumerPool.restore();
    }

    @Test
    public void testCallback()  {
        ConcurrentHashMap<Integer, KorlarConsumer> aa3 = new ConcurrentHashMap<>();


        ArrayList<Thread> ts = new ArrayList<>();

        AtomicInteger mm = new AtomicInteger(1);
        for(int p=0;p<4;p++) {
            Thread thread = new Thread(()->{
                KorlarConsumer korlarConsumer = null;
                for(int i=0;i<10000;i++) {
                    try {
                        if(i%300==0) {
                            TEnv.sleep(1);
                        }
                        korlarConsumer = korlaConsumerPool.getConsumer(1000000l);
                        aa3.put(korlarConsumer.hashCode(), korlarConsumer);

                        TimeMsg msg = new TimeMsg();
                        msg.setSeq(mm.getAndIncrement());

                        //业务回调
                        msg.setCallBack(new Callback() {
                            @Override
                            public Msg apply(Msg t) {
                                System.out.println(t + " S"+System.currentTimeMillis());
                                return null;
                            }
                        });

                        korlarConsumer.send(msg);

//                        aa2.put(msg.getId(), msg);
//                        System.out.println(msg.getResponse(1000) + " " + mm.getAndIncrement());

//                        if (i % 40 == 0) {
//                            client.close();
//                        }
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    } finally {
                        korlaConsumerPool.restitution(korlarConsumer);
                    }
                }

                TEnv.sleep(2000);
                System.out.println("done " + mm.get());

            });

            ts.add(thread);
            thread.start();
        }

        for(Thread t : ts){
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        int m = 0;
        while (m<10000000){
            TEnv.sleep(1);
            m++;
            if(m%1000 == 0) {
                System.out.println("Cache size: " +KorlaStatic.MESSAGE.size());
            }
        }
    }
}
